package com.study.flink.java.day04_async.sink.redis;

import com.study.flink.java.day04_async.ActivityBean;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class RedisActivityBeanMapper implements RedisMapper<ActivityBean> {

    //调用redis的写入方法
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "ACT_COUNT");
    }

    //写入redis中的key
    @Override
    public String getKeyFromData(ActivityBean bean) {
        return bean.aid + "_" + bean.eventType + "_" + bean.province;
    }

    //写入redis中的value
    @Override
    public String getValueFromData(ActivityBean bean) {
        return String.valueOf(bean.count);
    }

}
